#include "config.h"

#include <sys/types.h>
#include <unistd.h>
#include <string.h>
#include <errno.h>
#include <dirent.h>
#include <unistd.h>

#define DBG_SUBSYS S_LIBCONTROL

#include "lich_ctx.h"
#include "coroutine.h"
#include "limits.h"
#include "adt.h"
#include "sysy_lib.h"
#include "bmap.h"
#include "net_table.h"
#include "configure.h"
#include "table_proto.h"
#include "volume_proto.h"
#include "metadata.h"
#include "core.h"
#include "md_proto.h"
#include "md_map.h"
#include "net_global.h"
#include "volume_ctl.h"
#include "../storage/stor_rpc.h"
#include "volume.h"
#include "ramdisk.h"
#include "job_dock.h"
#include "ylog.h"
#include "dbg.h"
#include "volume_proto_eclog.h"

#include "iscsi.h"
#include "bh_task.h"

static inline void __snapshot_list_free(struct list_head *list)
{
        struct list_head *pos, *n;

        list_for_each_safe(pos, n, list) {
                list_del(pos);
                yfree((void **)&pos);
        }
}

int volume_proto_io_split(volume_proto_t *volume_proto, const io_t *_io,
                buffer_t *buf, chunk_io_t *ios, int *_segs)
{
        int ret, i, split;
        chunk_io_t *io;
        off_t offset;
        size_t left, split_size;
        ec_t *ec;

        if (buf)
                YASSERT(buf->len == _io->size);

        if (unlikely(_io->size > LICH_IO_MAX * LICH_SPLIT_MAX)) {
                DWARN("count %llu\n", (LLU)_io->size);
                ret = EINVAL;
                GOTO(err_ret, ret);
        }

#if ENABLE_EC
        ec = &volume_proto->table1.fileinfo.ec;
        if (EC_ISEC(ec)) {
#if ECLOG_ENABLE
                if (_io->flags & __FILE_ATTR_ECLOG__)
                        split_size = LICH_CHUNK_SPLIT;
                else
                        split_size = ec->k * LICH_CHUNK_SPLIT;
#else /* ECLOG_ENABLE */
                split_size = ec->k * LICH_CHUNK_SPLIT;
#endif /* ECLOG_ENABLE */
        } else {
                split_size = LICH_CHUNK_SPLIT;
        }
#else /* ENABLE_EC */
        (void) ec;
        split_size = LICH_CHUNK_SPLIT;
#endif /* ENABLE_EC */

        left = _io->size;
        offset = _io->offset;
        split = 0;
        for (i = 0; left > 0; i++) {
                io = &ios[i];

                io->io.lease = volume_proto->lease.token.seq;
                io->io.offset = offset % split_size;
                io->io.size = (io->io.offset + left) < split_size ?
                        left : (split_size - io->io.offset);
                if (_io->flags & __FILE_ATTR_DIRECT__)
                        io->io.flags = _io->flags & (~__FILE_ATTR_WRITEBACK__);
                else
                        io->io.flags = _io->flags | (volume_proto->table1.fileinfo.attr & __FILE_ATTR_WRITEBACK__);

                //YASSERT(attr & __FILE_ATTR_WRITEBACK__);
#if ECLOG_ENABLE
                YASSERT(ECLOG_DATA_SIZE_ALIGN(ec) % LICH_CHUNK_SPLIT == 0);
                YASSERT(ECLOG_LOG_SIZE_ALIGN(ec) % LICH_CHUNK_SPLIT == 0);

                if (EC_ISEC(ec)) {
                        if (_io->flags & __FILE_ATTR_ECLOG__) {
                                fid2cid(&io->io.id, &_io->id,
                                                (offset / ECLOG_SECTION_SIZE_ALIGN(ec)) *
                                                ECLOG_SECTION_CHUNK_COUNT(ec) +
                                                ((offset % ECLOG_SECTION_SIZE_ALIGN(ec)) / split_size));
                        } else {
                                fid2cid(&io->io.id, &_io->id,
                                                ((offset / ECLOG_DATA_SIZE_ALIGN(ec)) + 1) *
                                                ECLOG_LOG_CHUNK_COUNT(ec) +
                                                (offset / split_size));
                        }
                } else {
                        fid2cid(&io->io.id, &_io->id, offset / split_size);
                }
#else
                fid2cid(&io->io.id, &_io->id, offset / split_size);
#endif

                mbuffer_init(&io->buf, 0);
                io->chkinfo = (void *)io->__chkinfo__;
                io->chkstat = (void *)io->__chkstat__;
                io->vfm = (void *)io->__vfm__;
                left -= io->io.size;
                offset += io->io.size;
                split++;
        }

        if (split > 1) {
                YASSERT(split <= LICH_SPLIT_MAX);
        }

        YASSERT(split > 0);

        *_segs = split;

        if (buf) {
                for (i = 0; i < split; i++) {
                        io = &ios[i];
                        ret = mbuffer_pop(buf, &io->buf, io->io.size);
                        if (unlikely(ret))
                                YASSERT(0);

                        YASSERT(io->buf.len = io->io.size);
                }
        }

        return 0;
err_ret:
        return ret;
}

int volume_proto_extend(volume_proto_t *volume_proto, const io_t *io, const ec_t *ec)
{
        int ret, table_count, chknum;
        table1_t *table1;
        table2_t *table2;

        //DBUG(CHKID_FORMAT" %s status %u\n", CHKID_ARG(&chkinfo->id),
        
        chknum = size2chknum(io->offset + io->size, ec);
        table_count = chknum / FILE_PROTO_EXTERN_ITEM_COUNT;
        table_count = (table_count == 0) ? 1 : table_count;

        table1 = &volume_proto->table1;
        ret = table1->extend(table1, table_count);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        table2 = &volume_proto->table2;
        ret = table2->extend(table2, chknum, __OP_WRITE);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        return 0;
err_ret:
        return ret;
}

int volume_proto_write_raw(volume_proto_t *volume_proto, io_opt_t *io_opt, const io_t *io, const buffer_t *buf)
{
        int ret;
        ec_t *ec;

        ec = &volume_proto->table1.fileinfo.ec;
        ret = volume_proto_extend(volume_proto, io, ec);
        if (unlikely(ret))
                GOTO(err_ret, ret);

#if ENABLE_EC
        if (EC_ISEC(ec)) {
                ret = volume_proto_ec_write(volume_proto, io_opt, io, buf);
        } else {
                ret = volume_proto_rep_write(volume_proto, io_opt, io, buf);
        }
#else
        ret = volume_proto_rep_write(volume_proto, io_opt, io, buf);
#endif
        if (unlikely(ret))
                GOTO(err_ret, ret);

        return 0;
err_ret:
        return ret;
}

typedef struct
{
        co_cond_t *cond;
        volume_proto_t *volume_proto;
        const chkid_t *chkid;
        plock_t lock;
        int     *task_count;
        int     ret;
} discard_param_t;

void volume_proto_discard_callback(void *arg)
{
        discard_param_t *param = (discard_param_t *)arg;
        
        param->ret = volume_proto_discard(param->volume_proto, param->chkid);

        DBUG("ummap return, %d\r\n", param->ret);

        (*param->task_count)--;

        if(!(*param->task_count))
                co_cond_broadcast(param->cond, 0);

        //plock_unlock(&param->lock);
}

int volume_proto_discard_async(discard_param_t *param)
{
        schedule_task_new("volume_proto_discard", volume_proto_discard_callback, param, -1);

        return 0;
}

STATIC int __volume_proto_lba_split(const io_t *_io, uint32_t attr, chunk_io_t *ios, int *_segs)
{
        int i, split;
        chunk_io_t *io;
        off_t offset;
        size_t left;

        (void) attr;
        
        left = _io->size;
        offset = _io->offset;
        split = 0;
        for (i = 0; left > 0; i++) {
                io = &ios[i];

                io->io.offset = offset;// % LICH_CHUNK_SPLIT;
                io->io.size = min(left, LICH_CHUNK_SPLIT - offset % LICH_CHUNK_SPLIT);

                fid2cid(&io->io.id, &_io->id, offset / LICH_CHUNK_SPLIT);

                io->chkinfo = (void *)io->__chkinfo__;
                io->chkstat = (void *)io->__chkstat__;
                left -= io->io.size;
                offset += io->io.size;
                split++;
        }

        YASSERT(split > 0);

        *_segs = split;

        return 0;
}

int volume_proto_unmap_raw(volume_proto_t *volume_proto, io_opt_t *io_opt, const io_t *io)
{
        DBUG("unmap off %ju size %u\n", io->offset, io->size);

        int ret = 0, chunk_count = 0, i, _chunk_count = 0;
        chunk_io_t *chunk_io;//, *tmp;
        int empty = 0;
        static int enter = 0;

        ANALYSIS_BEGIN(0);

        while(enter){
                schedule_sleep("unmap", 10);
        }

        enter = 1;

        (void) io_opt;

        volume_proto->snapshot_isempty(volume_proto, &empty);
        if(!empty){
                enter = 0;
                return 0;
        }
        
        if (unlikely(volume_proto->table1.fileinfo.attr & __FILE_ATTR_SNAPSHOT__)) {
                ret = EPERM;
                GOTO(err_ret, ret);
        }

        ret = ymalloc((void **)&chunk_io, sizeof(chunk_io_t) * (io->size / LICH_CHUNK_SPLIT + 2)); //todo. max 1G
        if(unlikely(ret))
                GOTO(err_ret, ret);

        ret = __volume_proto_lba_split(io,
                                      volume_proto->table1.fileinfo.attr,
                                      chunk_io, &chunk_count);
        if (unlikely(ret)) {
                GOTO(err_split, ret);
        }

#if 0
        discard_param_t * param = malloc(sizeof(discard_param_t) * chunk_count);
        chunk_io_t *tmp;
        int task_count = 0;

        (void)_chunk_count;
        
        co_cond_t cond;
        co_cond_init(&cond);

        for (i = 0; i < chunk_count; i++) {
                
                tmp = &chunk_io[i];

                DBUG("unmap, processing %ju, %d\r\n", tmp->io.offset, (uint32_t)tmp->io.size);
                
                param[i].volume_proto = volume_proto;
                param[i].chkid = &tmp->io.id;
                
                param[i].task_count = &task_count;
                param[i].cond = &cond;

                task_count++;

                //plock_init(&param[i].lock);
                //plock_wrlock(&param[i].lock);

                volume_proto_discard_async(param + i);
        }

        DBUG("unmap, waiting for finish\r\n");

        //for (i = 0; i < chunk_count; i++) {
        //        plock_wrlock(&param[i].lock);
        //        plock_unlock(&param[i].lock);
        //}
        
        while (task_count > 0) {
                co_cond_wait2(&cond, "unmap_async");
        }

        free(param);

#else
        
        const chkid_t ** chks = malloc(sizeof(chkid_t *) * chunk_count);
        for (i = 0; i < chunk_count; i++) {
                if(chunk_io[i].io.size == LICH_CHUNK_SPLIT){
                        chks[_chunk_count] = &chunk_io[i].io.id;
                        _chunk_count ++;
                }
        }

        ret = volume_proto_batch_discard(volume_proto, chks, _chunk_count);
        if(ret)
                DBUG("volume_proto_batch_discard failed, ret=%d\r\n", ret);
        //now discard return values.

#endif 
        yfree((void **)&chunk_io);

        DBUG("unmap done, off %ju size %u\n", io->offset, io->size);

        enter = 0;

        ANALYSIS_QUEUE(0, IO_WARN, "volume_proto_unmap_raw");

        return 0;

err_split:

        yfree((void **)&chunk_io);
err_ret:
        enter = 0;
        
        ANALYSIS_QUEUE(0, IO_WARN, "volume_proto_unmap_raw");

        return ret;


        //return 0;
}

int volume_proto_truncate(volume_proto_t *volume_proto, uint64_t offset, uint32_t size)
{
        int ret;
        setattr_t setattr;
        fileinfo_t fileinfo;
        uint64_t newsize;

        newsize = offset + size;

        //newsize =  (newsize + 1073741824) / 1073741824 * 1073741824;

        if (unlikely(newsize > volume_proto->table1.fileinfo.size)) {
                DINFO("write file "CHKID_FORMAT" newsize %ju oldsize %ju\n",
                      CHKID_ARG(&volume_proto->chkid), newsize,
                      volume_proto->table1.fileinfo.size);

                memset(&setattr, 0x0, sizeof(setattr));
#if LSV
                setattr.size.set_it = SET_PHYSICAL_SIZE;
#else
                setattr.size.set_it = __SET_PHYSICAL_SIZE;
#endif
                setattr.size.size = newsize;
                ret = volume_proto->setattr(volume_proto, &fileinfo, &setattr);
                if (unlikely(ret))
                        GOTO(err_ret, ret);
        }

        return 0;
err_ret:
        return ret;
}

int volume_proto_table_write(volume_proto_t *volume_proto, const chkid_t *chkid,
                const buffer_t *buf, size_t size, off_t offset)
{
        table1_t *table1;

        table1 = &volume_proto->table1;
        return table1->table_write(table1, chkid, buf, size, offset);
}

int IO_FUNC volume_proto_read_raw(volume_proto_t *volume_proto, const io_t *io, buffer_t *buf)//, size_t size, off_t offset);
{
        int ret;
        ec_t *ec;

        DBUG("read file "CHKID_FORMAT" off %ju size %u\n",
              CHKID_ARG(&volume_proto->chkid), io->offset, io->size);

        YASSERT(volume_proto->chkid.id = io->id.id);
        if (unlikely((LLU)io->offset >= (LLU)volume_proto->table1.fileinfo.size || io->size == 0)) {
                // TODO for cloned volume, to read source snapshot
                if (volume_proto->table1.fileinfo.attr & __FILE_ATTR_SNAPSHOT__) {
                        ret = ENOENT;
                } else {
                        ret = EPERM;
                }
                GOTO(err_ret, ret);
        }

        ec = &volume_proto->table1.fileinfo.ec;
        if (!EC_ISEC(ec)) {
                if (unlikely((LLU)io->offset >= (LLU)volume_proto->table1.fileinfo.size)) {
                        YASSERT(0);
                }
        }

        ANALYSIS_BEGIN(0);

#if ENABLE_EC
        if (EC_ISEC(ec)) {
                ret = volume_proto_ec_read(volume_proto, io, buf);
        } else {
                ret = volume_proto_rep_read(volume_proto, io, buf);
        }
#else
        ret = volume_proto_rep_read(volume_proto, io, buf);
#endif
        if (unlikely(ret))
                GOTO(err_ret, ret);

#if RAMDISK_ENABLE
        buffer_t cmp;

        mbuffer_init(&cmp, 0);
        ret = ramdisk_pread(volume_proto->ramdisk_fd, &cmp, io->size, io->offset);
        if (unlikely(ret))
                GOTO(err_free2, ret);

        if (mbuffer_compare(&cmp, buf)) {
                DWARN("read file "CHKID_FORMAT" size %llu(%d,%d) off %llu not match with ramdisk\n",
                                CHKID_ARG(&volume_proto->chkid), (LLU)io->size, buf->len, cmp.len, (LLU)io->offset);
                char tmp[MAX_INFO_LEN];

                sprintf(tmp, "ramdisk buff "CHKID_FORMAT" (%llu, %llu) --> ",
                        CHKID_ARG(&volume_proto->chkid),
                        (LLU)io->offset, (LLU)io->size);

                mbuffer_dump(&cmp, 8, tmp);
        }

        mbuffer_free(&cmp);
#endif

        ANALYSIS_QUEUE(0, IO_WARN, "volume_read");

        return 0;
#if RAMDISK_ENABLE
err_free2:
        mbuffer_free(&cmp);
#endif
err_ret:
        return ret;
}

int volume_proto_table_read(volume_proto_t *volume_proto, const chkid_t *chkid, buffer_t *buf, size_t size, off_t offset)
{
        table1_t *table1;

        table1 = &volume_proto->table1;
        return table1->table_read(table1, chkid, buf, size, offset);
}

#if LICH_SNAPTREE_NEWALGO
int volume_proto_snapshot_read_newalgo(volume_proto_t *volume_proto, const io_t *io, buffer_t *buf)
{
        int ret, found = 0;
        table1_t *table1;
        fileinfo_t *fileinfo;
        uint64_t snapver;
        io_t newio;
        struct list_head list;
        struct list_head *pos;
        snap_t *snap;

        buffer_t newbuf;
        chkinfo_t *chkinfo;
        char _chkinfo[CHKINFO_MAX];

        ANALYSIS_BEGIN(0);

        table1 = &volume_proto->table1;
        fileinfo = &table1->fileinfo;
        INIT_LIST_HEAD(&list);

        ret = table1->snapshot_getversion(table1, &io->id, &snapver);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = volume_proto_snapshot_buildlist(volume_proto, fileinfo->snap_from, snapver, 0, &list);
        if (unlikely(ret))
                GOTO(err_ret, ret);


        ANALYSIS_BEGIN(1);
        io_init(&newio, &io->id, NULL, io->offset, io->size, io->flags);
        list_for_each(pos, &list) {
                snap = (void *)pos;

                newio.id = snap->chkinfo->id;
                ret = volume_proto_snapshot_read_remote(&snap->chkinfo->diskid[0].id, &newio, buf, FALSE);
                if (unlikely(ret)) {
                        if (ret == ENOENT || ret == ENOKEY) {
                                continue;
                        } else {
                                ANALYSIS_END(1, IO_WARN, NULL);
                                GOTO(err_ret, ret);
                        }
                }

                found = 1;
                break;
        }

        ANALYSIS_END(1, IO_WARN, NULL);

        if (!found) {
                newio.id = volume_proto->chkid;
                ret = volume_proto_read(volume_proto, &newio, buf);
                if (unlikely(ret))
                        GOTO(err_ret, ret);

                /* ===== maybe cow changed ===== */
                chkinfo = (void *)_chkinfo;
                ret = table1->snapshot_getbyversion(table1, fileinfo->snap_from, chkinfo, NULL);
                if (unlikely(ret))
                        GOTO(err_ret, ret);

                mbuffer_init(&newbuf, 0);
                newio.id = chkinfo->id;
                ret = volume_proto_snapshot_read_remote(&chkinfo->diskid[0].id, &newio, &newbuf, FALSE);
                if (likely(ret)) {
                        if (likely(ret == ENOKEY))
                                goto out;
                        else
                                GOTO(err_ret, ret);
                }

                mbuffer_pop(buf, NULL, buf->len);
                mbuffer_pop(&newbuf, buf, newbuf.len);
                /* ===== end ===== */
        }

out:

        ANALYSIS_END(0, IO_WARN, NULL);
        __snapshot_list_free(&list);
        return 0;
err_ret:
        __snapshot_list_free(&list);
        ANALYSIS_END(0, IO_WARN, NULL);
        return ret;
}
#endif

/**
 * 从某一快照开始，向其下游进行，如读不到，则沿公共交叉点到卷所在分支进行
 *
 * @param volume_proto 快照所在卷的控制器，不是快照控制器
 * @param io 其中id是快照chkid
 * @param buf
 * @return
 */
int volume_proto_snapshot_read(volume_proto_t *volume_proto, const io_t *io, buffer_t *buf)
{
        int ret;
        //fileid_t id;
        chkinfo_t *chkinfo;
        char _chkinfo[CHKINFO_MAX];
        const nid_t *nid;
        table1_t *table1;
        io_t newio;
        fileid_t snapid;
        buffer_t newbuf;

        DBUG("snapshot "CHKID_FORMAT" off %ju size %u\n",
             CHKID_ARG(&volume_proto->chkid), io->offset, io->size);

        ANALYSIS_BEGIN(0);

        table1 = &volume_proto->table1;
        chkinfo = (void *)_chkinfo;

        io_init(&newio, &io->id, NULL, io->offset, io->size, io->flags);
        while (1) {
                // 此处id是快照id，内部自动区分卷id和快照id
                ret = table1->chunk_getinfo(table1, &newio.id, chkinfo);
                if (unlikely(ret))
                        GOTO(err_ret, ret);

                nid = &chkinfo->diskid[0].id;
                ret = volume_proto_snapshot_read_remote(nid, &newio, buf, FALSE);
                if (unlikely(ret)) {
                        if (ret == ENOKEY) {
                                snapid = newio.id;
                                ret = table1->snapshot_next(table1, &newio.id, &newio.id, NULL, NULL);
                                if (unlikely(ret)) {
                                        if (ret == ENOENT) {
#if LICH_SNAPTREE_NEWALGO
                                                (void) snapid;
                                                (void) newbuf;
                                                ret = volume_proto_snapshot_read_newalgo(volume_proto, io, buf);
                                                if (unlikely(ret))
                                                        GOTO(err_ret, ret);
#else
                                                newio.id = volume_proto->chkid;
                                                ret = volume_proto_read(volume_proto, &newio, buf);
                                                if (unlikely(ret))
                                                        GOTO(err_ret, ret);

                                                /* ===== maybe cow changed ===== */
                                                mbuffer_init(&newbuf, 0);
                                                newio.id = snapid;
                                                ret = volume_proto_snapshot_read_remote(nid, &newio, &newbuf, FALSE);
                                                if (ret) {
                                                        if (ret == ENOKEY)
                                                                break;
                                                        else
                                                                GOTO(err_ret, ret);
                                                }

                                                mbuffer_pop(buf, NULL, buf->len);
                                                mbuffer_pop(&newbuf, buf, newbuf.len);
                                                /* ===== end ===== */
#endif

                                                break;
                                        } else
                                                GOTO(err_ret, ret);
                                }
                        } else
                                GOTO(err_ret, ret);
                } else
                        break;
        }

        ANALYSIS_END(0, IO_WARN, NULL);

        return 0;
err_ret:
        ANALYSIS_END(0, IO_WARN, NULL);
        return ret;
}

#if LICH_SNAPTREE_NEWALGO
int __volume_proto_snapshot_isdiff(volume_proto_t *volume_proto, const fileid_t *snapsrc,
                const fileid_t *snapdst, size_t size, off_t offset, int *diff)
{
        int ret, found = 0;
        io_t newio;
        buffer_t buf;
        table1_t *table1;
        uint64_t ver_src, ver_dst;

        struct list_head list;
        struct list_head *pos;
        snap_t *snap;

        table1 = &volume_proto->table1;
        INIT_LIST_HEAD(&list);

        ret = table1->snapshot_getversion(table1, snapsrc, &ver_src);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = table1->snapshot_getversion(table1, snapdst, &ver_dst);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = volume_proto_snapshot_buildlist(volume_proto, ver_dst, ver_src, 0, &list);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        list_for_each(pos, &list) {
                snap = (void *)pos;

                if (snap->snap_version == ver_src) {
                        found = 1;
                        break;
                }
        }

        if (!found) {
                ret = EINVAL;
                GOTO(err_ret, ret);
        }

        *diff = 0;
        mbuffer_init(&buf, 0);
        io_init(&newio, snapsrc, NULL, offset, size, 0);

        list_for_each(pos, &list) {
                snap = (void *)pos;

                if (snap->snap_version == ver_dst)
                        break;

                newio.id = snap->chkinfo->id;
                ret = volume_proto_snapshot_read_remote(&snap->chkinfo->diskid[0].id, &newio, &buf, TRUE);
                if (unlikely(ret)) {
                        if (ret == ENOKEY) {
                                continue;
                        } else
                                GOTO(err_ret, ret);
                }

                mbuffer_free(&buf);
                *diff = 1;
                break;
        }

        __snapshot_list_free(&list);
        return 0;
err_ret:
        __snapshot_list_free(&list);
        return ret;
}

#else

STATIC int __volume_proto_snapshot_isdiff(volume_proto_t *volume_proto, const fileid_t *snapsrc,
                const fileid_t *snapdst, size_t size, off_t offset, int *diff)
{
        int ret;
        fileid_t id;
        chkinfo_t *chkinfo;
        char _chkinfo[CHKINFO_MAX];
        const nid_t *nid;
        table1_t *table1;
        uint64_t ver_src, ver_dst;
        buffer_t buf;
        io_t io;

        *diff = 0;
        mbuffer_init(&buf, 0);

        table1 = &volume_proto->table1;
        ret = table1->snapshot_getversion(table1, snapdst, &ver_dst);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        io_init(&io, snapsrc, NULL, offset, size, 0);
        id = *snapsrc;
        chkinfo = (void *)_chkinfo;
        while (1) {
                ret = table1->snapshot_getversion(table1, &id, &ver_src);
                if (unlikely(ret))
                        GOTO(err_ret, ret);

                if (ver_src >= ver_dst) {
                        break;
                }

                ret = table1->chunk_getinfo(table1, &id, chkinfo);
                if (unlikely(ret))
                        GOTO(err_ret, ret);

                nid = &chkinfo->diskid[0].id;
                ret = volume_proto_snapshot_read_remote(nid, &io, &buf, TRUE);
                if (unlikely(ret)) {
                        if (ret == ENOKEY) {
                                ret = table1->snapshot_next(table1, &id, &id, NULL, NULL);
                                if (unlikely(ret)) {
                                        if (ret == ENOENT) {
                                                break;
                                        } else
                                                GOTO(err_ret, ret);
                                }
                        } else
                                GOTO(err_ret, ret);
                } else {
                        *diff = 1;
                        break;
                }
        }

        mbuffer_free(&buf);

        return 0;
err_ret:
        return ret;
}
#endif

/**
 * @note 要保留精简配置
 */
int volume_proto_snapshot_flat_bh1(volume_proto_t *volume_proto, int idx, int thin)
{
        int ret, retry = 0, exist = 0;
        table1_t *table1;
        chkid_t chkid;
        table1 = &volume_proto->table1;

        if (!(table1->fileinfo.attr & __FILE_ATTR_CLONE__)) {
                ret = EPERM;
                GOTO(err_ret, ret);
        }

        fid2cid(&chkid, &table1->fileinfo.id, idx);

        // flat类似写入过程，如卷上无该chunk，则从source snapshot clone
        ret = volume_proto_chunk_exist(volume_proto, &chkid, &exist);
        if (unlikely(ret)) {
                GOTO(err_ret, ret);
        }

        if (!exist) {
                volume_proto->flat_ctx.stat.clone++;
retry:
                //ANALYSIS_BEGIN(0);

                ret = volume_proto_snapshot_chunk_clone(volume_proto, &chkid, thin);
                if (unlikely(ret)) {
                        if (ret == EAGAIN) {
                                USLEEP_RETRY1(err_ret, ret, retry, retry, 2, (100 * 1000));
                        } else
                                GOTO(err_ret, ret);
                }

                //ANALYSIS_END(0, IO_WARN, "snapshot_chunk_clone");
        }

        return 0;
err_ret:
        return ret;
}

int volume_proto_snapshot_flat(volume_proto_t *volume_proto, int on)
{
        int ret;
        table1_t *table1;

        table1 = &volume_proto->table1;

        if (!(table1->fileinfo.attr & __FILE_ATTR_CLONE__)) {
                // check source
                ret = EPERM;
                GOTO(err_ret, ret);
        }

        ret = table1->snapshot_flat(table1, on);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        memset(&volume_proto->flat_ctx, 0x0, sizeof(volume_proto->flat_ctx));

        return 0;
err_ret:
        return ret;
}

int volume_proto_snapshot_diff(volume_proto_t *volume_proto, const fileid_t *fileid,
                const fileid_t *snapdst, buffer_t *buf, size_t size, off_t offset)
{
        int ret, diff;
        io_t io;

        ret = __volume_proto_snapshot_isdiff(volume_proto, fileid, snapdst,
                        size, offset, &diff);
        if (ret)
                GOTO(err_ret, ret);

        io_init(&io, snapdst, NULL, offset, size, 0);
        if (diff) {
                ret = volume_proto_snapshot_read(volume_proto, &io, buf);
                if (ret)
                        GOTO(err_ret, ret);
        } else {
                ret = ENOENT;
                GOTO(err_ret, ret);
        }

        return 0;
err_ret:
        return ret;
}
